package delayqueue

import (
	"github.com/go-redis/redis"
	"github.com/sirupsen/logrus"
	"strconv"
	"time"
)

const (
	MaxCount     = 10000
	DefaultCount = 1
	DefaultSleep = 1 * time.Second
	PREFIX       = "delay:"
	HashKey      = "msg"
)

// zremScript 将 zrangebyscore 和 zrem 一同使用lua脚本进行原子化操作，防止多个协程白取任务。
var zremScript = redis.NewScript(`
local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1)
if #resultArray > 0 then
	if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then
		return resultArray[1];
	end;
end;
return "";
`)

// manager ...
type manager struct {
	topic       string
	job         delayFunc
	concurrency int
	retryCount  int32
	timeout     time.Duration
	pollSize    int64
	startedAt   int64
}

// delayFunc ...
type delayFunc func(member interface{})

// getHashKey ...
func (m *manager) getHashKey() string {
	return PREFIX + "hash:" + HashKey
}

// getZSetKey ...
func (m *manager) getZSetKey() string {
	return PREFIX + "zset:" + m.topic
}

// saveTask ...
func (m *manager) saveTask(field string, value interface{}) error {
	logrus.Info(m.getHashKey())
	return rdb.HSet(m.getHashKey(), field, value).Err()
}

// getTask ...
func (m *manager) getTask(field string) (string, error) {
	return rdb.HGet(m.getHashKey(), field).Result()
}

// delTask ...
func (m *manager) delTask(field string) error {
	return rdb.HDel(m.getHashKey(), field).Err()
}

// addTask ...
func (m *manager) addTask(members []redis.Z) error {
	return rdb.ZAdd(m.getZSetKey(), members...).Err()
}

// remTask ...
func (m *manager) remTask(member interface{}) error {
	return rdb.ZRem(m.getZSetKey(), member).Err()
}

// lenTasks ...
func (m *manager) lenTasks() (int64, error) {
	return rdb.ZCard(m.getZSetKey()).Result()
}

// getAllTasks ...
func (m *manager) getAllTasks() ([]redis.Z, error) {
	currentLen, _ := m.lenTasks()
	return rdb.ZRangeWithScores(m.getZSetKey(), 0, currentLen).Result()
}

// getPollSize ...
func (m *manager) getPollSize() {
	if m.pollSize > MaxCount {
		m.pollSize = MaxCount
	}

	if m.pollSize == 0 {
		m.pollSize = DefaultCount
	}
}

// consumer ...
func (m *manager) consumer() {
	logrus.Infof("[%d]%s delay start...", GetGoroutineID(), m.topic)
	for {
		max := strconv.FormatInt(time.Now().Local().UnixNano(), 10)
		msgId, err := zremScript.Run(rdb, []string{m.getZSetKey()}, max).Result()
		if err != nil || msgId == "" {
			time.Sleep(DefaultSleep)
			continue
		}

		member, err := m.getTask(msgId.(string))
		if err != nil {
			continue
		}

		m.job(member)
	}
}
